// complier: gcc -o srv ft_event.c -lpthread -I ./

#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <netdb.h>
#include <stdint.h>
#include <netinet/in.h>
#include <strings.h>
#include <string.h>
#include <assert.h>
#include <stddef.h>
#include <signal.h>
#include <stdarg.h>

#include <netinet/tcp.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <sys/stat.h>
#include "ft_event.h"


static int exit_main_loop;


//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// util //////////////////////////////////////////////////////////////////////

int64_t ev_usec_now()
{
	struct timeval now;
	int64_t usec;
	int status;

	status = gettimeofday(&now, NULL);
	if (status < 0) {
        perror("gettimeofday failed");
		return -1;
	}

	usec = (int64_t)now.tv_sec * 1000000LL + (int64_t)now.tv_usec;

	return usec;
}

int64_t ev_msec_now() 
{
	return ev_usec_now() / 1000LL;
}

int ev_socket_nonblocing(int fd)
{
	int flags;

	flags = fcntl(fd, F_GETFL, 0);
	if (flags < 0) {
		return flags;
	}

	return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

int ev_socket_reuseaddr(int fd)
{
	int reuse;
	socklen_t len;

	reuse = 1;
	len = sizeof(reuse);

	return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, len);
}

int ev_socket_nodelay(int fd)
{
	int nodelay;
	socklen_t len;

	nodelay = 1;
	len = sizeof(nodelay);

	return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, len);
}

int ev_socket_keepalive(int fd)
{
	int val = 1;
	return setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val));
}

int ev_socket_listen(const char* ip, uint16_t port)
{
    int fd = -1;
    struct sockaddr_in sa;
    struct hostent* ent;

    fd = socket(AF_INET, SOCK_STREAM, 0);
    if(fd <0) {
        perror("srv_fd < 0");
        exit(1);
    }

    if(ev_socket_nonblocing(fd) < 0) {
        perror("fcntl(srv_fd, F_SETFL, flags | O_NONBLOCK) error");
        exit(1);
    }

    if(ev_socket_reuseaddr(fd) < 0) {
        perror("setsockopt(srv_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) error");
        exit(1);
    }

    bzero((char*)&sa, sizeof(sa));

    if(ip == NULL) {
        sa.sin_addr.s_addr = INADDR_ANY;
    } else {
        ent = gethostbyname(ip);
        if(ent == NULL) {
            perror("gethostbyname error");
            exit(0);
        }

        bcopy((char *)ent->h_addr,
                 (char *)&sa.sin_addr.s_addr,
                 ent->h_length);
    }

    sa.sin_family = AF_INET;
    sa.sin_port = htons(port);

    if(bind(fd, (struct sockaddr*)&sa, sizeof(sa)) < 0) {
        perror("bind error");
        exit(1);
    }

    if(listen(fd, SOMAXCONN) < 0) {
        perror("listen error");
        exit(1);
    }
    return fd;
}


void ev_daemon()
{
    pid_t pid;

    /* Fork off the parent process */
    pid = fork();

    /* An error occurred */
    if (pid < 0)
        exit(EXIT_FAILURE);

    /* Success: Let the parent terminate */
    if (pid > 0)
        exit(EXIT_SUCCESS);

    /* On success: The child process becomes session leader */
    if (setsid() < 0)
        exit(EXIT_FAILURE);

    /* Catch, ignore and handle signals */
    //TODO: Implement a working signal handler */
    signal(SIGCHLD, SIG_IGN);
    signal(SIGHUP, SIG_IGN);

    /* Fork off for the second time*/
    pid = fork();

    /* An error occurred */
    if (pid < 0)
        exit(EXIT_FAILURE);

    /* Success: Let the parent terminate */
    if (pid > 0)
        exit(EXIT_SUCCESS);

    /* Set new file permissions */
    umask(0);

    /* Change the working directory to the root directory */
    /* or another appropriated directory */
    chdir("/");

    /* Close all open file descriptors */
    int x;
    for (x = sysconf(_SC_OPEN_MAX); x>0; x--)
    {
        close (x);
    }

}

//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// log //////////////////////////////////////////////////////////////////////

ev_log_t* ev_log_init(int level, char *filename)
{
    int f = open(filename, O_WRONLY | O_APPEND | O_CREAT, 0644);
    if(f == -1) {
        return NULL;
    }

    ev_log_t* log = (ev_log_t*)malloc(sizeof(ev_log_t));
    if(log == NULL) {
        perror("malloc log error");
        close(f);
        exit(0);
    }

    bzero(log, sizeof(*log));
    log->fd = f;
    log->level = level;

    return log;
}

void ev_log_finit(ev_log_t* self)
{
    if(self->fd > 0) {
        close(self->fd);
    }

    free(self);
}

void ev_log_write(ev_log_t* self, int level, const char* fmt, ...)
{

    static const char *levelstr[] = {"NONE", "ERROR" ,"INFO" ,"DEBUG"};


    if(self && level > self->level) {
        return;
    }

    va_list ap;
    char msg[EV_LOG_MAX_LEN];
    char buf[EV_LOG_MAX_LEN];

    va_start(ap, fmt);
    vsnprintf(msg, sizeof(msg), fmt, ap);
    va_end(ap);

    struct timeval tv;
    gettimeofday(&tv, NULL);
    struct tm *t = localtime(&tv.tv_sec);
    size_t n = snprintf(buf, EV_LOG_MAX_LEN, "%d-%02d-%02d %02d:%02d:%02d.%06d [%s][%d] %s\n",
            t->tm_year + 1900,
            t->tm_mon + 1,
            t->tm_mday,
            t->tm_hour,
            t->tm_min,
            t->tm_sec,
            tv.tv_usec,
            levelstr[level],
            getpid(),
            msg);

    if(self && self->fd) {
        printf("%s", buf);
        write(self->fd, buf, n);
    } else {
        fprintf(stderr, "%s", buf);
    }
}

ev_log_t* ev_default_log()
{
    static ev_log_t* log = NULL;
    if(log == NULL) {
        log = ev_log_init(ev_default_config()->log_level, ev_default_config()->logfile);
    }
    return log;
}


//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// array //////////////////////////////////////////////////////////////////////

ev_array_t* ev_array_new(uint32_t alloc_size, size_t elem_size) 
{
	ev_array_t* a;
	assert(alloc_size != 0 && elem_size != 0);

	a = (ev_array_t*)malloc(sizeof(a));
	if(NULL == a) {
		return NULL;
	}

    if(0 != ev_array_init(a, alloc_size, elem_size)) {
        free(a);
        return NULL;
    }

	return a;
}

void ev_array_del(ev_array_t *a)
{
    if(a->elem != NULL) {
    	free(a->elem);
    }
    free(a);
}


int ev_array_init(ev_array_t *a, uint32_t alloc_size, size_t elem_size)
{
    assert(alloc_size != 0 && elem_size != 0);

    a->elem = malloc(alloc_size * elem_size);
    if (a->elem == NULL) {
        return -1;
    }

    a->nelem = 0;
    a->size = elem_size;
    a->nalloc = alloc_size;

    return 0;
}

void ev_array_deinit(ev_array_t *a)
{
    if (a->elem != NULL) {
        free(a->elem);
    }
}

void* ev_array_get(ev_array_t *a, uint32_t idx)
{
    void *elem;
    assert(a->nelem != 0);
    assert(idx < a->nelem);
    elem = (uint8_t *)a->elem + (a->size * idx);
    return elem;
}

void* ev_array_push(ev_array_t* a)
{
    void *elem, *new;
    size_t size;

    if (a->nelem == a->nalloc) {
        size = a->size * a->nalloc;
        new = realloc(a->elem, 2 * size);
        if (new == NULL) {
            return NULL;
        }
        a->elem = new;
        a->nalloc *= 2;
    }

    elem = (uint8_t *)a->elem + a->size * a->nelem;
    a->nelem++;

    return elem;
}

void* ev_array_pop(ev_array_t* a)
{
    void *elem;
    assert(a->nelem != 0);

    a->nelem--;
    elem = (uint8_t *)a->elem + a->size * a->nelem;

    return elem;
}

void ev_array_swap(ev_array_t *a, ev_array_t *b)
{
    ev_array_t tmp;
    tmp = *a;
    *a = *b;
    *b = tmp;
}

void* array_get(ev_array_t *a, uint32_t idx)
{
    void *elem;

    assert(a->nelem != 0);
    assert(idx < a->nelem);

    elem = (uint8_t *)a->elem + (a->size * idx);

    return elem;
}

int ev_array_each(ev_array_t* a, array_each_t func, void* data)
{
	uint32_t i, nelem;
	int status;

    assert(a->nelem != 0);
    assert(func != NULL);

    for (i = 0, nelem = a->nelem; i < nelem; ++i) {
        void *elem = array_get(a, i);
        status = func(elem, data);
        if (status != 0) {
            return status;
        }
    }
    return 0;
}





//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// buffer //////////////////////////////////////////////////////////////////////

ev_buf_t* ev_buf_new(const char* data, size_t size)
{
    ev_buf_t* buf = (ev_buf_t*)malloc(sizeof(ev_buf_t));
    if(buf == NULL) {
        LOG_ERROR("(ev_buf_t*)malloc(sizeof(ev_buf_t)) error");
        return NULL;
    }

    bzero(buf, sizeof(*buf));

    if(size) {
        ev_buf_resize(buf, size);
        ev_buf_append(buf, data, size);
    }

    return buf;
}

ev_buf_t* ev_buf_new2(size_t size)
{
    ev_buf_t* buf = (ev_buf_t*)malloc(sizeof(ev_buf_t));
    if(buf == NULL) {
        LOG_ERROR("(ev_buf_t*)malloc(sizeof(ev_buf_t)) error");
        return NULL;
    }

    bzero(buf, sizeof(*buf));

    if(size > 0) {
        ev_buf_resize(buf, size);
    }

    return buf;
}

void ev_buf_del(ev_buf_t* self)
{
    if(self->buf) {
        free(self->buf);
    }

    free(self);
}

void ev_buf_resize(ev_buf_t* self, size_t n)
{
    if(self->total < n) {
        self->buf = realloc(self->buf, n);
        self->total = n;
    }
}

size_t ev_buf_append(ev_buf_t* self, const char* data, size_t len)
{
    if(!len) {
        return self->len;
    }

    size_t tailFree = self->total - self->offset - self->len;
    if(tailFree < len){
        if(tailFree + self->offset >= len){
            memcpy(self->buf, self->buf + self->offset, self->len);
            self->offset = 0;
        }else{
            ev_buf_resize(self, self->total + len);
        }
    }

    memcpy(self->buf + self->offset + self->len, data, len);
    self->len+= len;

    self->buf[self->offset + self->len] = '\0';

    return self->len;
}

size_t ev_buf_seek(ev_buf_t* self, size_t n)
{
    if(n >= self->len){
        self->len = 0;
        self->offset = 0;
    }else{
        self->offset+= n;
        self->len-= n;
    }

    return self->len;
}

int ev_buf_empty(ev_buf_t* self)
{
    return self->len == 0;
}

void ev_buf_clear(ev_buf_t* self)
{
    self->len = self->offset = 0;
}

size_t ev_buf_size(ev_buf_t* self)
{
    return self->len;
}

const char* ev_buf_lock(ev_buf_t* self)
{
    return self->buf + self->offset;
}

//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// timer manager //////////////////////////////////////////////////////////////////////

ev_timer_mgr_t* ev_timer_mgr_new(size_t slots, int granularity) 
{
	int i;
	ev_timer_mgr_t* mgr;

    assert(slots > 0 && granularity > 0);

	mgr = (ev_timer_mgr_t*)malloc(sizeof(*mgr));
	if(mgr == NULL) {
        LOG_ERROR("ev_timer_mgr_new: mgr = (ev_timer_mgr_t*)malloc(sizeof(*mgr)) error\n");
		return NULL;
	}

    if( 0 != ev_array_init(&mgr->timer_slots, slots, sizeof(ev_timer_head_t)) ) {
        LOG_ERROR("ev_timer_mgr_new: 0 != ev_array_init(mgr->timer_slots, slots, sizeof(ev_timer_t)) error\n");
		free(mgr);
		return NULL;
	}

	for(i=0; i<slots; ++i) {
        ev_timer_head_t* head = ev_array_push(&mgr->timer_slots);
        TAILQ_INIT(head);
	}

	mgr->granularity = granularity;
	mgr->index = 0;
	mgr->slots = slots;
	mgr->update_time = ev_msec_now();

	return mgr;
}

void ev_timer_mgr_del(ev_timer_mgr_t* mgr)
{
	
}

int ev_timer_mgr_start_timer(ev_timer_mgr_t* mgr, ev_timer_t* t, int64_t expire, int repeate)
{
	assert(mgr && t);

	uint32_t cursor;
	uint32_t ticks;
	uint32_t td;

	if(expire < mgr->granularity) {
		ticks = mgr->granularity;
	} else {
		ticks = (expire / mgr->granularity);
	}

	td = (ticks % mgr->slots);

	t->rotation = (ticks / mgr->slots);

	cursor = ((mgr->index + td) % mgr->slots);

	t->repeat = repeate;
	t->expire = expire;
    t->slot = cursor;

    //INSERT_TAIL(t, (ev_timer_t*)ev_array_get(&mgr->timer_slots, cursor));
    TAILQ_INSERT_TAIL((ev_timer_head_t*)ev_array_get(&mgr->timer_slots, cursor), t, node);

	return 0;
}

int ev_tiemr_mgr_stop_timer(ev_timer_mgr_t* mgr, ev_timer_t* t)
{
    //REMOVE_ITEM(t);
    TAILQ_REMOVE((ev_timer_head_t*)ev_array_get(&mgr->timer_slots, t->slot), t, node);
	return 0;
}

int ev_timer_mgr_tick(ev_timer_mgr_t* mgr, int64_t now)
{
	int32_t span;
	int32_t i;
	span = (now - mgr->update_time) / mgr->granularity;

	if(span == 0) {
		return 0;
	}

	for(i =0; i < span; ++i) {
		ev_timer_t *item;
		ev_timer_t *temp;

        ev_timer_head_t swap;
        ev_timer_head_t* head = (ev_timer_head_t*)ev_array_get(&mgr->timer_slots, mgr->index);

        TAILQ_INIT(&swap);

        TAILQ_SWAP(head, &swap, ev_timer_s, node);

//		LIST_REPLACE(head, &timers);
//		LIST_INIT(head);

        TAILQ_FOREACH_SAFE(item, &swap,  node, temp) {
            TAILQ_REMOVE(&swap, item, node);
            if(item->rotation > 0) {
                item->rotation--;
                TAILQ_INSERT_TAIL(head, item, node);
            } else {
                if(item->job.cb) {
                    if(item->job.cb(item->job.ptr) != 0) {
                        continue;
                    }
                }
                if(item->repeat) {
                    ev_timer_mgr_start_timer(mgr, item, item->expire, item->repeat);
                }
            }
        }


		
//		LIST_EACH_SAFE(item, temp, &timers) {
//			REMOVE_ITEM(item);
//			if(item->rotation > 0) {
//				item->rotation--;
//				INSERT_TAIL(item, head);
//			} else {
//				item->job.cb(item->job.ptr);
//				if(item->repeat) {
//					ev_timer_mgr_start_timer(mgr, item, item->expire, item->repeat);
//				}
//			}
//		}
		
		mgr->index = ++mgr->index % mgr->slots;
	}

	mgr->update_time = ev_msec_now();

	return 0;
}



//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// async //////////////////////////////////////////////////////////////////////


ev_async_t* ev_async_new(ev_loop_t* ev)
{
    int fds[2];
    ev_async_t* async = NULL;

    async = (ev_async_t*)malloc(sizeof(ev_async_t));
    if(async == NULL) {
        return NULL;
    }

    bzero(async, sizeof(ev_async_t));

    if(0 != pthread_mutex_init(&async->watch_mtx, NULL) ) {
        perror("ev_async_new: pthread_mutex_init error\n");
        exit(1);
    }
#ifdef USE_KQUEUE
    if( pipe(fds) < 0) {
        perror("ev_async_new: pipe2 error\n");
        pthread_mutex_destroy(&async->watch_mtx);
        free(async);
        return NULL;
    }

    int flags = fcntl(fds[0], F_GETFD);
    flags |= O_NONBLOCK|O_CLOEXEC;

    fcntl(fds[0], F_SETFD, flags);

    flags = fcntl(fds[1], F_GETFD);
    flags |= O_NONBLOCK|O_CLOEXEC;
    fcntl(fds[1], F_SETFD, flags);

#else
    if( pipe2(fds, O_NONBLOCK | O_CLOEXEC) < 0) {
        perror("ev_async_new: pipe2 error\n");
        pthread_mutex_destroy(&async->watch_mtx);
        free(async);
        return NULL;
    }
#endif


    if(0 != ev_array_init(&async->watch_jobs, 1, sizeof(ev_job_t))) {
        LOG_ERROR("ev_async_new: ev_array_init(async->watch_jobs, 64, sizeof(ev_job_t)) error\n");
        pthread_mutex_destroy(&async->watch_mtx);
        close(fds[0]);
        close(fds[1]);
        free(async);
        return NULL;
    }

    async->fd = fds[0];
    async->wfd = fds[1];

    async->type = EV_ASYNC;
    async->ev = ev;
    async->refc = 1;

    ev_event_add_in(ev->evp, async->fd, (ev_stream_t*)async);

    return async;
}

static int ev__async_read(ev_async_t* async)
{

    int size = 0;
    int n;
    int i;
    ev_array_t temp;
    char buf[1024];

    LOG_ENTER_FN;
	

    for(;;) {
        do {
            n = read(async->fd, buf, sizeof(buf));
        }while(n < 0 && errno == EINTR);

        if(n <= 0) {
            break;
        }

        if(n == sizeof(buf)) {
            continue;
        } else {
            break;
        }
    }

    pthread_mutex_lock(&async->watch_mtx);

    if(ev_array_n(&async->watch_jobs) > 0) {
        ev_array_swap(&async->watch_jobs, &temp);
    } else {
        pthread_mutex_unlock(&async->watch_mtx);
        return 0;
    }

    ev_array_init(&async->watch_jobs, 1, sizeof(ev_job_t));

    pthread_mutex_unlock(&async->watch_mtx);

    size = ev_array_n(&temp);

    for(i =0; i < size; ++i) {
        ev_job_t* job = ev_array_get(&temp, i);
        if(job->cb) {
            job->cb(job->ptr);
        }

    }

    ev_array_deinit(&temp);

    return 0;
}

static int ev__async_stop(ev_async_t* async)
{

    LOG_ENTER_FN;

    ev_event_del_in(async->ev->evp, async->fd, (ev_stream_t*)async);

    close(async->fd);
    close(async->wfd);

    return 0;
}


static int ev__async_free(ev_async_t* async)
{
    LOG_ENTER_FN;

    return 0;
}

int ev_async_post(ev_async_t* async, ev_job_t* op)
{
    int r;
    int len = 1;
    char* buf = "";
    ev_job_t* job;

    pthread_mutex_lock(&async->watch_mtx);

    job = ev_array_push(&async->watch_jobs);
    if(job == NULL) {
        LOG_ERROR("ev__async_post_job error\n");
        pthread_mutex_unlock(&async->watch_mtx);
        return -1;
    }

    *job = *op;

    pthread_mutex_unlock(&async->watch_mtx);

    do {
        r = write(async->wfd, buf, len);
    }while (r == -1 && errno == EINTR);

    if (r == len) {
        return 0;
    }

    if (r == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            return 0;
        }
    }

    exit(1);

    return 0;
}

// timeout op
void ev_add_timeout_watch(ev_tcp_t* c)
{
    if(ev_default_config()->max_timeout <=0) {
        return;
    }

    c->timeout = (uint32_t)time(NULL) + ev_default_config()->max_timeout;

    TAILQ_INSERT_TAIL(&c->ev->streams_timeout, c, timeout_tqe);

    //INSERT_TAIL(&c->timeout, &c->ev->stream_timeoutQ);
}

void ev_del_timeout_watch(ev_tcp_t* c)
{
    if(ev_default_config()->max_timeout <=0) {
        return;
    }
    //REMOVE_ITEM(&c->timeout);
    TAILQ_REMOVE(&c->ev->streams_timeout, c, timeout_tqe);

}

void ev_reset_timeout_watch(ev_tcp_t* c)
{
    if(ev_default_config()->max_timeout <=0) {
        return;
    }
    //REMOVE_ITEM(&c->timeout);
    ev_del_timeout_watch(c);
    ev_add_timeout_watch(c);
}

//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// tcp //////////////////////////////////////////////////////////////////////



ev_tcp_t* ev_tcp_new(ev_loop_t* ev, int fd)
{
    ev_tcp_t* c = NULL;

    c = (ev_tcp_t*)malloc(sizeof(ev_tcp_t));
    if(c == NULL) {
        LOG_ERROR("client_t* c = malloc(sizeof(struct client_t)) == NULL\n");
        return NULL;
    }
    memset(c, 0, sizeof(ev_tcp_t));

    c->in_buf = ev_buf_new2(ev_default_config()->buf_trunk_size);
    c->out_buf = ev_buf_new2(ev_default_config()->buf_trunk_size);

    c->type = EV_TCP;
    c->fd = fd;
    c->ev = ev;
    c->refc = 1;

    ev_loop_put_stream(ev, (ev_stream_t*)c, 1);

    return c;
}


static int ev__tcp_stop(ev_tcp_t* c)
{
    LOG_ENTER_FN;

    ev_del_timeout_watch(c);

    ev_buf_del(c->in_buf);
    ev_buf_del(c->out_buf);

    ev_event_del_in(c->ev->evp, c->fd, (ev_stream_t*)c);
    ev_event_del_out(c->ev->evp, c->fd, (ev_stream_t*)c);

    close(c->fd);

    if(c->stop_cb) {
        c->stop_cb(c);
    }

	return 0;
}

static int ev__tcp_free(ev_tcp_t* c)
{
    LOG_ENTER_FN;

    if(c->free_cb) {
        c->free_cb(c);
    }

	return 0;
}

static int ev__tcp_write(ev_tcp_t* c)
{
    if(!ev_buf_empty(c->out_buf)) {
        int n = 0;
        const char* data = ev_buf_lock(c->out_buf);
        size_t size = ev_buf_size(c->out_buf);

        ev_reset_timeout_watch(c);

        if(c->write_cb) {
            c->write_cb(c);
        }

        do {
            n = send(c->fd, data, size, 0);
        }while(n < 0 && errno == EINTR);

        if(n > 0) {
            int remain = ev_buf_seek(c->out_buf, n);
            if(remain <= 0) {
                if(ev_event_del_out(c->ev->evp, c->fd, (ev_stream_t*)c) < 0) {
                    ev_stream_del((ev_stream_t*)c);
                    return -1;
                }
            }
        }
        if(n < 0) {
            if(errno == EAGAIN) {
                return 0;
            }
            LOG_ERROR("send data error");
            ev_stream_del((ev_stream_t*)c);
            return -1;
        }
    } else {
        if(ev_event_del_out(c->ev->evp, c->fd, (ev_stream_t*)c) < 0) {
            ev_stream_del((ev_stream_t*)c);
            return -1;
        }
    }
    return 0;
}

static int ev__tcp_read(ev_tcp_t* c)
{
    LOG("begin client_t read\n");

	int n = 0;
    int nrecv = 0;

    char buf[4096] = {0};

    ev_reset_timeout_watch(c);

    do {
        n = recv(c->fd, buf, sizeof(buf), 0);
    }while(n < 0 && errno == EINTR);

    if( n == 0) {
        LOG_INFO("peer close\n");
        ev_stream_del((ev_stream_t*)c);
        return -1;
    }

    if(n < 0) {
        if(errno == EWOULDBLOCK || errno == EAGAIN) {
            return 0;
        } else {
            ev_stream_del((ev_stream_t*)c);
            return -1;
        }
    }

    ev_buf_append(c->in_buf, buf, n);

    if(c->read_cb) {
        c->read_cb(c);
    }

    return nrecv;
}

//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// loop //////////////////////////////////////////////////////////////////////

static void ev__loop_pending_queue_cb(ev_loop_t* ev) 
{
	int size = 0;
	int i;

	ev_array_t temp = NULL_ARRAY;

	pthread_mutex_lock(&ev->pending_mtx);

	if(ev_array_n(&ev->pending_jobs) > 0 ) {
		ev_array_swap(&ev->pending_jobs, &temp);
	} else {
		pthread_mutex_unlock(&ev->pending_mtx);
		return;
	}

	ev_array_init(&ev->pending_jobs, 64, sizeof(ev_job_t));

	pthread_mutex_unlock(&ev->pending_mtx);

	size = ev_array_n(&temp);

	for(i =0; i < size; ++i) {
		ev_job_t* job = ev_array_get(&temp, i);
		job->cb(job->ptr);
	}
	ev_array_deinit(&temp);
}

static void ev__loop_watch_timeout(ev_loop_t* ev)
{
    ev_tcp_t *item, *temp;

    uint32_t now = time(NULL);

    TAILQ_FOREACH_SAFE(item, &ev->streams_timeout, timeout_tqe, temp) {
        if(now >= item->timeout ) {
            TAILQ_REMOVE(&ev->streams_timeout, item, timeout_tqe);
            LOG_INFO("%d time out", item->fd);
            ev_stream_del((ev_stream_t*)item);
        } else {
            break;
        }
    }
}

static void* ev__loop_run_loop(void* ptr)
{
	ev_loop_t* ev  = NULL;

    LOG("run thread: %02x succ", pthread_self());

	ev = (ev_loop_t*)ptr;

    while(!ev->exit) {
        ev__loop_watch_timeout(ev);
        ev__loop_pending_queue_cb(ev);
        ev_timer_mgr_tick(ev->timer_mgr, ev_msec_now());
        ev_event_poll(ev->evp, 100);
    }

    LOG("thread %02x exit\n", pthread_self());
	pthread_exit(NULL);
}


static int on_loop_cb(void* data, unsigned int mask)
{
    ev_stream_t* c = (ev_stream_t*)data;

    if(mask & kEventError) {
        ev_stream_del(c);
        return -1;
    }

    if(mask & kEventRead) {
        if(c->type == EV_TCP) {
            ev_tcp_t* tcp = (ev_tcp_t*)c;
            if(ev__tcp_read(tcp) < 0) {
                ev_stream_del(c);
                return -1;
            }
        } else if(c->type == EV_ASYNC) {
            ev_async_t* async = (ev_async_t*)c;
            if(ev__async_read(async) < 0) {
                ev_stream_del(c);
                return -1;
            }
        }
    }
    if(mask & kEventWrite) {
        if(c->type == EV_TCP) {
            ev_tcp_t* tcp = (ev_tcp_t*)c;
            if(ev__tcp_write(tcp) < 0) {
                ev_stream_del(c);
                return -1;
            }
        } else if(c->type == EV_ASYNC) {

        }
    }
    return 0;
}

ev_loop_t* ev_loop_new()
{
	ev_loop_t* ev = (ev_loop_t*)malloc(sizeof(ev_loop_t));
	if(ev == NULL) {
		return NULL;
	}
	
	memset(ev, 0, sizeof(ev_loop_t));

    //LIST_INIT(&ev->streamQ);
    //LIST_INIT(&ev->stream_timeoutQ);

    TAILQ_INIT(&ev->streams_list);
    TAILQ_INIT(&ev->streams_timeout);

    ev->evp = ev_event_new(32, on_loop_cb);

    if(ev->evp == NULL) {
        perror("create evbase error\n");
    }

	if(ev_array_init(&ev->pending_jobs, 64, sizeof(ev_job_t)) != 0) {
		goto err;
	}

	ev->timer_mgr = ev_timer_mgr_new(256, 100);
	if(ev->timer_mgr == NULL) {
		goto err;
	}

	if(pthread_mutex_init(&ev->pending_mtx, NULL) != 0) {
		goto err;
	}
	
    if(pthread_mutex_init(&ev->stream_mtx, NULL) != 0) {
		goto err;
	}

    if(0 != pthread_create(&ev->cur_thread, NULL, ev__loop_run_loop, (void*)ev)) {
		perror("pthread_create error\n");
		exit(1);
	}
	
	return ev;
	
err:
    perror("ev_loop_new error\n");
    if(ev) {
        if(ev->evp) {
            ev_event_destroy(ev->evp);
        }
        free(ev);
    }

	return NULL;
}

void ev_loop_del(ev_loop_t* ev)
{
    ev_stream_t* c, *tvar;

    TAILQ_FOREACH_SAFE(c, &ev->streams_list, node, tvar) {
        ev_stream_del(c);
    }

    ev_event_destroy(ev->evp);

	pthread_mutex_destroy(&ev->pending_mtx);
    pthread_mutex_destroy(&ev->stream_mtx);
	
	free(ev);
}

void ev_loop_put_pending_queue(ev_loop_t* ev, ev_job_t* op) 
{
	ev_job_t* job;

	pthread_mutex_lock(&ev->pending_mtx);

	job = ev_array_push(&ev->pending_jobs);
	if(job == NULL) {
        LOG_ERROR("ev_loop_put_peeding_queue error\n");
		pthread_mutex_unlock(&ev->pending_mtx);
		return;
	}

	*job = *op;

	pthread_mutex_unlock(&ev->pending_mtx);

}


void ev_loop_put_stream(ev_loop_t* ev, ev_stream_t* c, int put_tail)
{
    pthread_mutex_lock(&ev->stream_mtx);
	if(put_tail) {
        //INSERT_TAIL(c, &ev->streamQ)
        TAILQ_INSERT_TAIL(&ev->streams_list, c, node);
	} else {
        //INSERT_HEAD(c, &ev->streamQ)
        TAILQ_INSERT_HEAD(&ev->streams_list, c, node);
	}
    ++ev->nstreams;
    pthread_mutex_unlock(&ev->stream_mtx);
}

void ev_loop_remove_stream(ev_loop_t* ev, ev_stream_t* c)
{
	assert(c->ev == ev);

    if(c->type == EV_TCP) {
        pthread_mutex_lock(&ev->stream_mtx);
        TAILQ_REMOVE(&ev->streams_list, c, node);
        --ev->nstreams;
        pthread_mutex_unlock(&ev->stream_mtx);
    }
}


int ev_loop_accept(ev_loop_t* ev, int sfd)
{
	int fd;
	do {
        fd = accept(sfd, NULL, NULL);
	} while(fd < 0 && errno == EINTR);

	if(fd < 0) {
        if(errno != EAGAIN) {
            LOG_ERROR("accept fd < 0 error\n");
            return -1;
        }
	}
	else {
        LOG("incomming connect: %d\n", fd);

        if(ev_socket_nonblocing(fd) < 0) {
            LOG_ERROR("connect fd set nonblocking error\n");
            close(fd);
            return -1;
        }

        if(ev_socket_nodelay(fd) < 0) {
            LOG_ERROR("connect fd set nodelay error\n");
            close(fd);
            return -1;
        }

        ev_tcp_t* c = ev_tcp_new(ev, fd);
        if( c == NULL) {
            LOG_ERROR("create client_t error\n");
            close(fd);
            return -1;
        }

        ev_add_timeout_watch(c);
        ev_event_add_in(ev->evp, fd, (ev_stream_t*)c);

	}
	return 0;
}



static int ev__stream_force_del(void* ptr)
{
    ev_stream_t* c = (ev_stream_t*)ptr;
	assert(c->refc == 0);
    if(c->type == EV_TCP) {
        ev__tcp_free((ev_tcp_t*)c);
    } else if(c->type == EV_ASYNC) {
        ev__async_free((ev_async_t*)c);
    }

    ev_loop_remove_stream(c->ev, (ev_stream_t*)c);

	free(c);

    return 1;
}

void ev_stream_del(ev_stream_t* c)
{
	if(__sync_bool_compare_and_swap(&c->stop, 0, 1) ) {
        if(c->type == EV_TCP) {
            ev__tcp_stop((ev_tcp_t*)c);
        } else if(c->type == EV_ASYNC) {
            ev__async_stop((ev_async_t*)c);
        }
	}

	if( __sync_fetch_and_sub(&c->refc, 1) == 1) {
        if(pthread_self() == c->ev->cur_thread) {
            ev__stream_force_del(c);
		} else {
            LOG("ev_loop_put_pending_queue");
            ev_job_t op = { c, ev__stream_force_del };
			ev_loop_put_pending_queue(c->ev, &op);
		}
	}
}

ev_tcp_t* ev_tcp_connect(ev_loop_t* ev, const char* ip, uint16_t port)
{
    ev_tcp_t* tcp = NULL;
    int fd;
    fd = socket(AF_INET, SOCK_STREAM, 0);
    if(fd == -1) {
        LOG_ERROR("create tcp socket error %d", errno);
        return NULL;
    }

    ev_socket_nonblocing(fd);
    ev_socket_reuseaddr(fd);

    struct sockaddr_in sa;

    struct hostent* ent = gethostbyname(ip);
    if(ent == NULL) {
        LOG_ERROR("gethostbyname(%s) error: %d", ip, errno);
        close(fd);
        return NULL;
    }

    bcopy((char *)ent->h_addr,
             (char *)&sa.sin_addr.s_addr,
             ent->h_length);

    sa.sin_family = AF_INET;
    sa.sin_port = htons(port);

    tcp = ev_tcp_new(ev, fd);
    if(tcp == NULL) {
        close(fd);
        return NULL;
    }

    if( connect(fd, (const struct sockaddr*)&sa, sizeof(sa)) != 0 ) {
        if(errno != EINPROGRESS) {
            LOG_ERROR("connect (%s:%d) error: %d", ip, port, errno);
            ev_stream_del((ev_stream_t*)tcp);
            return NULL;
        } else {
            tcp->connecting = 1;
            return tcp;
        }
    }

    tcp->connected = 1;
    return tcp;
}

size_t ev_tcp_write(ev_tcp_t* self, const char* data, size_t len)
{
    if(self->connecting == 1) {
        self->connecting = 0;
        self->connected = 1;
        if(self->connect_cb) {
            self->connect_cb(self);
        }
    }
    if(data && len) {
        if(ev_buf_empty(self->out_buf)) {
            ev_event_add_out(self->ev->evp, self->fd, (ev_stream_t*)self);
        }
        return ev_buf_append( self->out_buf, data, len);
    }
    return 0;
}

static void* work_run(void* arg)
{
    ev_thread_pool_t* pool = (ev_thread_pool_t*)arg;

    while(!pool->exit) {

        ev_job_t* job;

        pthread_mutex_lock(&pool->mtx);

        while(ev_array_n(&pool->jobs) == 0) {
            ++pool->idle_cnt;
            pthread_cond_wait(&pool->cond, &pool->mtx);
            --pool->idle_cnt;
            if(pool->exit) {
                goto exit;
            }
        }

        LOG("run thread pool job");

        job = ev_array_pop(&pool->jobs);
        pthread_mutex_unlock(&pool->mtx);

        if(job->cb) {
            job->cb(job->ptr);
        }
    }
exit:
    return NULL;
}

ev_thread_pool_t* ev_thread_pool_new(int nthread)
{
    int i = 0;
    ev_thread_pool_t* pool = (ev_thread_pool_t*)malloc(sizeof(ev_thread_pool_t) + sizeof(pthread_t) * nthread );

    if(pool == NULL) {
        perror("create thread pool error");
        exit(0);
    }

    bzero(pool, sizeof(ev_thread_pool_t) + sizeof(pthread_t) * nthread);

    pool->init_queue_cnt = 16;

    pthread_mutex_init(&pool->mtx, NULL);
    pthread_cond_init(&pool->cond, NULL);

    ev_array_init(&pool->jobs, pool->init_queue_cnt, sizeof(ev_job_t));

    pool->thread_cnt = nthread;
    for(i = 0; i < nthread; ++i) {
        pthread_create(&pool->thread[i], NULL, work_run, (void*)pool);
    }
    return pool;
}

void ev_thread_pool_post(ev_thread_pool_t* self, ev_job_t* op)
{
    ev_job_t* job;

    LOG("enter ev_thread_pool_post");
    pthread_mutex_lock(&self->mtx);
    job = ev_array_push(&self->jobs);
    *job = *op;
    if(self->idle_cnt > 0) {
        LOG("enter pthread_cond_signal");
        pthread_cond_signal(&self->cond);
    }
    pthread_mutex_unlock(&self->mtx);
}

ev_thread_pool_t* ev_default_thread_pool()
{
    static ev_thread_pool_t* pool = NULL;
    if(pool == NULL) {
        pool = ev_thread_pool_new(ev_default_config()->default_npool);
    }
    return pool;
}

void ev_thread_pool_destroy(ev_thread_pool_t* pool)
{
    int i =0;
    pool->exit = 1;
    pthread_cond_broadcast(&pool->cond);
    for(; i < pool->thread_cnt; ++i) {
        pthread_join(pool->thread[i], NULL);
    }
   pthread_mutex_destroy(&pool->mtx);
   pthread_cond_destroy(&pool->cond);

   free(pool);
}


ev_event_t* ev_event_new(int nevensts, event_cb_pt cb)
{
    int evfd;
    ev_event_t* ev = NULL;
#ifdef USE_KQUEUE
    evfd = kqueue();
#else
    evfd = epoll_create(1024);
#endif

    if(evfd < 0) {
        perror("create evfd error\n");
        exit(0);
    }
    ev = (ev_event_t*)malloc(sizeof(ev_event_t));
    if(ev == NULL) {
        close(evfd);
        return NULL;
    }

    bzero(ev, sizeof(ev_event_t));

#ifdef USE_KQUEUE
    ev->events = (struct kevent*)malloc(sizeof(struct kevent) * nevensts);
#else
    ev->events = (struct epoll_event*)malloc(sizeof(struct epoll_event) * nevensts);
#endif

    if(ev->events == NULL) {
        close(evfd);
        free(ev);
        return NULL;
    }

    ev->evfd = evfd;
    ev->nevents = nevensts;
    ev->ev_cb = cb;

    return ev;
}

void ev_event_destroy(ev_event_t* self) {

    if(self->events) {
        free(self->events);
    }

    close(self->evfd);
    free(self);
}



static int ev__event_op(ev_event_t* self, int fd, int ev, int op, void* data)
{
#ifdef USE_KQUEUE
    struct kevent ke;

    int kop = (op == kEventOpAdd) ? EV_ADD | EV_CLEAR : EV_DELETE;

    if(ev & kEventRead) {
        EV_SET(&ke, fd, EVFILT_READ, kop, 0, 0, data);
        if (kevent(self->evfd, &ke, 1, NULL, 0, NULL) == -1) {
            return -1;
        }
    }

    if(ev & kEventWrite) {
        EV_SET(&ke, fd, EVFILT_WRITE, kop, 0, 0, data);
        if (kevent(self->evfd, &ke, 1, NULL, 0, NULL) == -1) {
            return -1;
        }
    }
#else
    struct epoll_event evp;
    int kop = (op == kEventOpAdd) ? EPOLL_CTL_ADD : EPOLL_CTL_DEL;
    bzero(&evp, sizeof(evp));

    evp.data.fd = fd;
    evp.data.ptr = data;

    if(ev & kEventRead) {
        evp.events = EPOLLIN;
        if( epoll_ctl(self->evfd, kop,  fd, &evp) < 0) {
            if(errno == EEXIST) {
                if( epoll_ctl(self->evfd, EPOLL_CTL_MOD,  fd, &evp) < 0) {
                    return -1;
                }
            } else {
                return -1;
            }
        }
    }

    if(ev & kEventWrite) {
        evp.events = EPOLLOUT;
        if( epoll_ctl(self->evfd, kop,  fd, &evp) < 0) {
            if(errno == EEXIST) {
                if( epoll_ctl(self->evfd, EPOLL_CTL_MOD,  fd, &evp) < 0) {
                    return -1;
                }
            } else {
                return -1;
            }
        }
    }
#endif
    return 0;
}

int ev_event_add_in(ev_event_t* self, int fd, ev_stream_t* c)
{
    if(c && c->recv_active) {
        return 0;
    }

    if( ev__event_op(self, fd, kEventRead, kEventOpAdd, c) < 0 ) {
        LOG_ERROR("ev_event_add_in error");
        return -1;
    } else {
        if(c) {
            c->recv_active = 1;
        }
    }

    return 0;
}

int ev_event_del_in(ev_event_t* self, int fd, ev_stream_t* c)
{
    if(c && c->recv_active == 0) {
        return 0;
    }
    if(ev__event_op(self, fd, kEventRead, kEventOpDel, NULL) < 0) {
        LOG_ERROR("ev_event_del_in error");
        return -1;
    } else {
        if(c) {
            c->recv_active = 0;
        }
    }
    return 0;
}

int ev_event_add_out(ev_event_t* self, int fd, ev_stream_t* c)
{
    if(c && c->send_active) {
        return 0;
    }
    if( ev__event_op(self, fd, kEventWrite, kEventOpAdd, c) < 0) {
        LOG_ERROR("ev_event_add_out error");
    } else {
        if(c) {
            c->send_active = 1;
        }
    }
    return 0;
}

int ev_event_del_out(ev_event_t* self, int fd, ev_stream_t* c)
{
    if(c && c->send_active == 0) {
        return 0;
    }
    if(ev__event_op(self, fd, kEventWrite, kEventOpDel, NULL) < 0) {
        LOG_ERROR("ev_event_del_out error");
    } else {
        if(c) {
            c->send_active = 0;
        }

    }
    return 0;
}

int ev_event_add_all(ev_event_t* self, int fd, ev_stream_t* c)
{
    if(c && c->send_active == 1 && c->recv_active == 1) {
        return 0;
    }

    if(c && c->send_active == 0 && c->recv_active == 0) {
        if(ev__event_op(self, fd, kEventAll, kEventOpAdd, c) < 0) {
            LOG_ERROR("ev_event_add_all error");
            return -1;
        } else if(c){
            c->send_active = 1;
            c->recv_active = 1;
            return 0;
        }
    } else if(c && c->send_active == 0) {
        return ev_event_add_out(self, fd, c);
    } else if(c){
        return ev_event_add_in(self, fd, c);
    }

    return -1;
}

int ev_event_resize(ev_event_t* self, int n)
{
    if(self->nevents < n) {
#ifdef USE_KQUEUE
    self->events = (struct kevent*)realloc(self->events, sizeof(struct kevent) * n);
#else
    self->events = (struct epoll_event*)realloc(self->events, sizeof(struct epoll_event) * n);
#endif
    }
    return 0;
}

int ev_event_poll(ev_event_t* self, int timeout)
{
    int n = 0;
    int i = 0;


#ifdef USE_KQUEUE
    do {
        if(timeout < 0) {
            n = kevent(self->evfd, NULL, 0, self->events, self->nevents, NULL);
        } else {
            struct timespec time = { timeout / 1000L, (timeout % 1000L) * 1000000L };
            n = kevent(self->evfd, NULL, 0, self->events, self->nevents, &time);
        }
    }while(n < 0 && errno == EINTR);


    if(n > 0) {
        for(i = 0; i < n; ++i) {
            int mask = 0;
            struct kevent* e = self->events + i;

            if(e->flags & EV_ERROR) {
                if (e->data == EBADF || e->data == EINVAL ||
                    e->data == ENOENT || e->data == EINTR) {
                    continue;
                 }
                mask |= kEventError;
            }

            if(e->filter & EVFILT_READ) mask |= kEventRead;
            if(e->filter & EVFILT_WRITE) mask |= kEventWrite;

            if(self->ev_cb && mask != 0) {
                self->ev_cb(e->udata, mask);
            }
        }

        if(n == self->nevents) {
            ev_event_resize(self, self->nevents * 2);
        }

        return n;
    }

    if(n == 0) {
        if(timeout == -1) {
            LOG_ERROR("ev_event_poll kevent error");
            return -1;
        }
        return 0;
    }

    return -1;

#else
    do {
        n = epoll_wait(self->evfd, self->events, self->nevents, timeout);
    }while(n < 0 && errno == EINTR);


    if( n > 0) {
        for(i = 0; i < n; ++i) {
            int mask = 0;
            struct epoll_event* e = self->events + i;

            if(e->events & EPOLLERR) mask |= kEventError;

            if(e->events & EPOLLIN|EPOLLHUP) mask |= kEventRead;
            if(e->events & EPOLLOUT) mask |= kEventWrite;

            if(self->ev_cb && mask != 0) {
                self->ev_cb(e->data.ptr, mask);
            }
        }

        if(n == self->nevents) {
            ev_event_resize(self, self->nevents * 2);
        }

        return n;
    }
    if(n == 0) {
        if(timeout == -1) {
            LOG_ERROR("ev_event_poll epoll wait error");
            return -1;
        }
        return 0;
    }

    return -1;
#endif

}




//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// config //////////////////////////////////////////////////////////////////////



static inline const char* ev_config_skip_start_space(const char* line) {
    const char* pl = line;
    if(pl == NULL) {
        return NULL;
    }
    while (isspace(*pl)) pl++;
    return pl;
}

static inline const char* ev_config_trim_end_space(const char* line) {
    const char* pl = line;
    if(pl == NULL) {
        return NULL;
    }
    pl = pl + strlen(line) - 1;
    while (isspace(*pl)) pl--;
    return pl;
}

static inline const char* ev_config_skip_comment_empty(const char* line)
{
    const char* pl = ev_config_skip_start_space(line);
    if(pl && *pl == '\0') {
        return NULL;
    }
    if(pl && *pl == '#') {
        return NULL;
    }
    return pl;
}

static inline int ev_config_read_int32(const char* line, const char* key, int32_t* value)
{
    const char* pl = ev_config_skip_start_space(line);

    if(strncasecmp(pl, key, strlen(key)) == 0) {
        pl += strlen(key);
        pl = ev_config_skip_start_space(pl);
        if(*pl++ != '=') {
            return 0;
        }
        *value = atoi(pl);
        return 0;
    }
    return -1;
}


static inline int ev_config_read_string(const char* line, const char* key, char* value, int len)
{
    const char* pl = ev_config_skip_start_space(line);

    if(strncasecmp(pl, key, strlen(key)) == 0) {
        pl += strlen(key);
        pl = ev_config_skip_start_space(pl);
        if(*pl++ != '=') {
            return 0;
        }
        const char* end = ev_config_trim_end_space(line);
        if(len < end - pl) {
            perror("read_string buf size too small");
            exit(0);
        }
        strncpy(value, pl, end-pl);
        return 0;
    }
    return -1;
}


static void ev_config_print(const char* var, ev_config_t* cfg)
{
    LOG_SAFE("%s set:\n"
           "====================================\n"
           "daemon          = %d\n"
           "logfile         = %s\n"
           "pidfile         = %s\n"
           "loglevel        = %d\n"
           "max_nconnect    = %d\n"
           "max_thread      = %d\n"
           "npool           = %d\n"
           "buf trunk size  = %d\n"
           "max timeout     = %d\n"
           "server port     = %d\n"
           "====================================",
           var,
           cfg->daemon,
           cfg->logfile,
           cfg->pidfile,
           cfg->log_level,
           cfg->max_nconnect,
           cfg->max_nthread,
           cfg->default_npool,
           cfg->buf_trunk_size,
           cfg->max_timeout,
           cfg->port
           );
}

static int ev_config_set_default(ev_config_t* cfg)
{
    char cwd[256] = {0};

    bzero(cfg, sizeof(ev_config_t));
    cfg->daemon = 0;

    getcwd(cwd, sizeof(cwd));

    strcat(cfg->logfile, cwd);
    strcat(cfg->logfile, "/a.log");

    cfg->log_level = EV_LOG_DEBUG;
    cfg->max_nconnect = 1024;
    cfg->max_nthread = 2;
    cfg->max_nwork_thread = 1;
    cfg->default_npool = 3;
    cfg->port = 7070;
    cfg->buf_trunk_size = 4096;

    strcat(cfg->pidfile, cwd);
    strcat(cfg->pidfile, "/a.pid");

    ev_config_print("default", cfg);

    return 0;
}


ev_config_t* ev_default_config()
{
    static ev_config_t* config =  NULL;
    if(config == NULL) {
        config = malloc(sizeof(*config));
        if(config == NULL) {
            LOG_SAFE("malloc config error");
        }
        ev_config_set_default(config);
    }
    return config;
}

static int ev_config_load(const char* path)
{
    FILE* f = fopen(path, "r");

    char line[256] = {0};

    if(f == NULL) {
        LOG_SAFE("load %s config error", path);
        return -1;
    }

    ev_config_t* config = ev_default_config();

    do {

        bzero(line, sizeof(line));

        if(!fgets(line, sizeof(line), f)) {
            break;
        }

        const char* pl = ev_config_skip_comment_empty(line);
        if(!pl) {
            continue;
        }

        if(ev_config_read_int32(pl, MAX_THREAD, &config->max_nthread) == 0) {
            if(config->max_nthread < 1) {
                //
            }
            continue;
        }

        if(ev_config_read_int32(pl, SERV_PORT, (int*)&config->port) == 0) {
            continue;
        }

        if(ev_config_read_int32(pl, MAX_TIMEOUT, &config->max_timeout) == 0) {
            continue;
        }

        if(ev_config_read_int32(pl, MAX_CONNECTS, &config->max_nconnect) == 0) {
            continue;
        }

        if(ev_config_read_int32(pl, DAEMON, &config->daemon) == 0) {
            continue;
        }

        if(ev_config_read_string(pl, LOG_FILE_PATH, config->logfile, sizeof(config->logfile)) == 0) {
            continue;
        }

        if(ev_config_read_string(pl, PID_FILE_PATH, config->pidfile, sizeof(config->pidfile)) == 0) {
            continue;
        }

        if(ev_config_read_string(pl, DEFAULT_POOL_SIZE, config->pidfile, sizeof(config->pidfile)) == 0) {
            continue;
        }

        LOG_SAFE("unknow config %s", line);
    }while(1);

    fclose(f);

    ev_config_print("custom", config);

    return 0;
}




static int on_new_connect(void* data, unsigned int mask)
{
    static int curloop = -1;

    if(mask & kEventRead) {
        ev_server_t* server = (ev_server_t*)data;
        ev_loop_accept(server->evloop[++curloop%ev_default_config()->max_nthread], server->sfd);
    }
    return 0;
}

static void ev_sigint_handler(int signum)
{
    LOG_ENTER_FN;

    exit_main_loop = 1;
}

static void ev_init_signal()
{
    struct sigaction saint;

    signal(SIGPIPE, SIG_IGN);

    sigemptyset(&saint.sa_mask);
    saint.sa_flags = 0;
    saint.sa_handler = ev_sigint_handler;

    sigaction(SIGQUIT, &saint, NULL);
    sigaction(SIGTERM, &saint, NULL);
    sigaction(SIGINT, &saint, NULL);
    sigaction(SIGUSR1, &saint, NULL);
}

int ev_server_run(const char* config_filepath, pre_server_run_pt pre_run)
{
    int i;

    if(config_filepath != NULL) {
        if(0 != ev_config_load(config_filepath)) {
            perror("load config file error");
            return -1;
        }
    }

    if(ev_default_config()->daemon == 1) {
        ev_daemon();
    }

    ev_init_signal();

    ev_config_t* config = ev_default_config();

    ev_server_t* server = malloc(sizeof(ev_server_t));
    if(server == NULL) {
        perror("malloc serer error");
        exit(0);
    }

    bzero((void*)server, sizeof(ev_server_t));

    server->evloop = (ev_loop_t**)malloc(config->max_nthread * sizeof(ev_loop_t*));
    if(server->evloop == NULL) {
        perror("malloc ev_loop_t error");
        exit(0);
    }

    for(i =0; i < config->max_nthread; ++i) {
        server->evloop[i] = ev_loop_new();

        if(server->evloop[i] == NULL) {
            perror("gev[i] = evloop_t_new() error");
            exit(1);
        }
    }

    ev_default_thread_pool();

    server->sfd = ev_socket_listen(NULL, config->port);
    if(server->sfd == -1) {
        perror("create server sfd error");
        exit(0);
    }

    server->evbase = ev_event_new(1, on_new_connect);
    ev__event_op(server->evbase, server->sfd, kEventRead, kEventOpAdd, (void*)server);

    if(pre_run) {
        pre_run(server);
    }

    while(!exit_main_loop) {
        ev_event_poll(server->evbase, 200);
    }


    for(i =0; i < config->max_nthread; ++i) {
        server->evloop[i]->exit = 1;
        pthread_join(server->evloop[i]->cur_thread, NULL);
        ev_loop_del(server->evloop[i]);
    }

    close(server->sfd);

   free(server);

   return 0;

}
